package com.katesoft.scale4j.rttp.subscription;

import com.hazelcast.core.DistributedTask;
import com.hazelcast.core.ExecutionCallback;
import com.katesoft.scale4j.common.model.IPrettyPrintableObject;
import com.katesoft.scale4j.persistent.enums.CrudType;
import com.katesoft.scale4j.persistent.model.unified.AbstractPersistentEntity;
import com.katesoft.scale4j.rttp.internal.RttpHazelcastBridgeAwareImpl;
import net.jcip.annotations.ThreadSafe;
import org.apache.commons.lang.builder.ReflectionToStringBuilder;
import org.hibernate.event.PostDeleteEvent;
import org.hibernate.event.PostDeleteEventListener;
import org.hibernate.event.PostInsertEvent;
import org.hibernate.event.PostInsertEventListener;
import org.hibernate.event.PostUpdateEvent;
import org.hibernate.event.PostUpdateEventListener;

import java.io.Serializable;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;

import static com.katesoft.scale4j.persistent.enums.CrudType.CREATE;
import static com.katesoft.scale4j.persistent.enums.CrudType.DELETE;
import static com.katesoft.scale4j.persistent.enums.CrudType.UPDATE;
import static com.katesoft.scale4j.rttp.internal.IRttpHazelcastRefs.SUBSCRIBER_EXECUTOR;

/**
 * This class allows to subscribe for insert, update, delete events of any domain entity you like.
 *
 * @author kate2007
 */
@ThreadSafe
public class CloudSubscriber extends RttpHazelcastBridgeAwareImpl
    implements PostInsertEventListener, PostUpdateEventListener, PostDeleteEventListener, Serializable
{
    private Collection<ICloudSubscription<? super AbstractPersistentEntity>> cloudSubscriptions;

    public CloudSubscriber()
    {
        cloudSubscriptions = new LinkedHashSet<ICloudSubscription<? super AbstractPersistentEntity>>(0);
    }

    @Override
    public final void onPostInsert(PostInsertEvent event)
    {
        if (event.getEntity() instanceof AbstractPersistentEntity) {
            AbstractPersistentEntity obj = (AbstractPersistentEntity) event.getEntity();
            logger.debug("received db insert for entity[class=%s, uid=%s]", obj.getPersistentClass().getSimpleName(), obj.getUniqueIdentifier());
            submitEvent(CREATE, obj);
        }
    }

    @Override
    public final void onPostUpdate(PostUpdateEvent event)
    {
        if (event.getEntity() instanceof AbstractPersistentEntity) {
            AbstractPersistentEntity obj = (AbstractPersistentEntity) event.getEntity();
            logger.debug("received db update for entity[class=%s, uid=%s]", obj.getPersistentClass().getSimpleName(), obj.getUniqueIdentifier());
            submitEvent(UPDATE, obj);
        }
    }

    @Override
    public final void onPostDelete(PostDeleteEvent event)
    {
        if (event.getEntity() instanceof AbstractPersistentEntity) {
            AbstractPersistentEntity obj = (AbstractPersistentEntity) event.getEntity();
            logger.debug("received db delete for entity[class=%s, uid=%s]", obj.getPersistentClass().getSimpleName(), obj.getUniqueIdentifier());
            submitEvent(DELETE, obj);
        }
    }

    /**
     * this method will create dedicated distributed thread for each business process that evaluated successfully by predicate and submit distributed task to
     * hazelcast engine.
     *
     * @param type crud event type
     * @param obj  domain entity
     */
    protected void submitEvent(CrudType type,
                               AbstractPersistentEntity obj)
    {
        for (ICloudSubscription<? super AbstractPersistentEntity> next : cloudSubscriptions) {
            IUnaryPredicate<? super AbstractPersistentEntity> predicate = next.getPredicate();
            Class<? super AbstractPersistentEntity> subscriptionClass = next.getSubscriptionClass();
            if (subscriptionClass.isAssignableFrom(obj.getClass())) {
                Boolean b = predicate.execute(obj);
                logger.debug("%s.execute(%s/%s) = %s", predicate, obj.getClass().getSimpleName(), obj.getGlobalUniqueIdentifier(), b.toString());
                Collection<? extends IBusinessProcess<? super AbstractPersistentEntity>> processes = next.getProcesses();
                logger.debug("invoking %s business processes for entity(%s/%s)", processes.size(), obj.getPersistentClass().getSimpleName(),
                             obj.getGlobalUniqueIdentifier());
                final CountDownLatch latch = new CountDownLatch(processes.size());
                for (IBusinessProcess<? super AbstractPersistentEntity> process : processes) {
                    logger.debug("%s.process(%s/%s)", process, obj.getPersistentClass().getSimpleName(), obj.getGlobalUniqueIdentifier());
                    BusinessServiceProcess serviceProcess = new BusinessServiceProcess(process, predicate, obj, type);
                    DistributedTask<Void> task = new DistributedTask<Void>(serviceProcess, null);
                    task.setExecutionCallback(new ExecutionCallback<Void>()
                    {
                        @Override
                        public void done(Future<Void> voidFuture)
                        {
                            latch.countDown();
                        }
                    });
                    bridge.getHazelcastInstance().getExecutorService(SUBSCRIBER_EXECUTOR).execute(task);
                }
                try {
                    latch.await();
                    logger.debug("execution of %s finished", processes);
                }
                catch (InterruptedException e) {
                    logger.error(e);
                }
            }
        }
    }

    private static class BusinessServiceProcess implements Runnable, Serializable, IPrettyPrintableObject
    {
        private IBusinessProcess<? super AbstractPersistentEntity> process;
        private IUnaryPredicate<? super AbstractPersistentEntity> predicate;
        private AbstractPersistentEntity entity;
        private CrudType crudType;

        private BusinessServiceProcess(IBusinessProcess<? super AbstractPersistentEntity> process,
                                       IUnaryPredicate<? super AbstractPersistentEntity> predicate,
                                       AbstractPersistentEntity entity,
                                       CrudType crudType)
        {
            this.process = process;
            this.predicate = predicate;
            this.entity = entity;
            this.crudType = crudType;
        }

        @SuppressWarnings({"unchecked"})
        @Override
        public void run()
        {
            process.process(new SubscriptionEvent(crudType, entity));
        }

        @Override
        public String toString()
        {
            return String.format("%s.%s", predicate.getClass().getSimpleName(), process.getClass().getSimpleName());
        }

        @Override
        public String reflectionToString()
        {
            return ReflectionToStringBuilder.reflectionToString(this);
        }
    }

    /**
     * inject cloud subscribers. this is required field.
     *
     * @param cloudSubscriptions collection of client subscribers.
     */
    public void setCloudSubscriptions(Collection<ICloudSubscription<? super AbstractPersistentEntity>> cloudSubscriptions)
    {
        this.cloudSubscriptions = cloudSubscriptions;
    }

    public Collection<ICloudSubscription<? super AbstractPersistentEntity>> getCloudSubscriptions()
    {
        return cloudSubscriptions;
    }
}
